home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
email
/
message.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
14KB
|
524 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
__all__ = [
'Message']
import re
import uu
import binascii
import warnings
from cStringIO import StringIO
import email.charset as email
from email import utils
from email import errors
SEMISPACE = '; '
paramre = re.compile('\\s*;\\s*')
tspecials = re.compile('[ \\(\\)<>@,;:\\\\"/\\[\\]\\?=]')
def _formatparam(param, value = None, quote = True):
if value is not None and len(value) > 0:
if isinstance(value, tuple):
param += '*'
value = utils.encode_rfc2231(value[2], value[0], value[1])
if quote or tspecials.search(value):
return '%s="%s"' % (param, utils.quote(value))
else:
return '%s=%s' % (param, value)
else:
return param
def _parseparam(s):
plist = []
while s[:1] == ';':
s = s[1:]
end = s.find(';')
while end > 0 and s.count('"', 0, end) % 2:
end = s.find(';', end + 1)
if end < 0:
end = len(s)
f = s[:end]
if '=' in f:
i = f.index('=')
f = f[:i].strip().lower() + '=' + f[i + 1:].strip()
plist.append(f.strip())
s = s[end:]
return plist
def _unquotevalue(value):
if isinstance(value, tuple):
return (value[0], value[1], utils.unquote(value[2]))
else:
return utils.unquote(value)
class Message:
def __init__(self):
self._headers = []
self._unixfrom = None
self._payload = None
self._charset = None
self.preamble = None
self.epilogue = None
self.defects = []
self._default_type = 'text/plain'
def __str__(self):
return self.as_string(unixfrom = True)
def as_string(self, unixfrom = False):
Generator = Generator
import email.Generator
fp = StringIO()
g = Generator(fp)
g.flatten(self, unixfrom = unixfrom)
return fp.getvalue()
def is_multipart(self):
return isinstance(self._payload, list)
def set_unixfrom(self, unixfrom):
self._unixfrom = unixfrom
def get_unixfrom(self):
return self._unixfrom
def attach(self, payload):
if self._payload is None:
self._payload = [
payload]
else:
self._payload.append(payload)
def get_payload(self, i = None, decode = False):
if i is None:
payload = self._payload
elif not isinstance(self._payload, list):
raise TypeError('Expected list, got %s' % type(self._payload))
else:
payload = self._payload[i]
if decode:
if self.is_multipart():
return None
cte = self.get('content-transfer-encoding', '').lower()
None if cte == 'quoted-printable' else None<EXCEPTION MATCH>binascii.Error
if cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'):
sfp = StringIO()
try:
uu.decode(StringIO(payload + '\n'), sfp, quiet = True)
payload = sfp.getvalue()
except uu.Error:
return payload
except:
None<EXCEPTION MATCH>uu.Error
None<EXCEPTION MATCH>uu.Error
return payload
def set_payload(self, payload, charset = None):
self._payload = payload
if charset is not None:
self.set_charset(charset)
def set_charset(self, charset):
if charset is None:
self.del_param('charset')
self._charset = None
return None
if isinstance(charset, basestring):
charset = email.charset.Charset(charset)
if not isinstance(charset, email.charset.Charset):
raise TypeError(charset)
self._charset = charset
if not self.has_key('MIME-Version'):
self.add_header('MIME-Version', '1.0')
if not self.has_key('Content-Type'):
self.add_header('Content-Type', 'text/plain', charset = charset.get_output_charset())
else:
self.set_param('charset', charset.get_output_charset())
if str(charset) != charset.get_output_charset():
self._payload = charset.body_encode(self._payload)
if not self.has_key('Content-Transfer-Encoding'):
cte = charset.get_body_encoding()
try:
cte(self)
except TypeError:
self._payload = charset.body_encode(self._payload)
self.add_header('Content-Transfer-Encoding', cte)
except:
None<EXCEPTION MATCH>TypeError
None<EXCEPTION MATCH>TypeError
def get_charset(self):
return self._charset
def __len__(self):
return len(self._headers)
def __getitem__(self, name):
return self.get(name)
def __setitem__(self, name, val):
self._headers.append((name, val))
def __delitem__(self, name):
name = name.lower()
newheaders = []
for k, v in self._headers:
if k.lower() != name:
newheaders.append((k, v))
continue
self._headers = newheaders
def __contains__(self, name):
return [] in [ k.lower() for k, v in self._headers ]
def has_key(self, name):
missing = object()
return self.get(name, missing) is not missing
def keys(self):
return [ k for k, v in self._headers ]
def values(self):
return [ v for k, v in self._headers ]
def items(self):
return self._headers[:]
def get(self, name, failobj = None):
name = name.lower()
for k, v in self._headers:
if k.lower() == name:
return v
continue
return failobj
def get_all(self, name, failobj = None):
values = []
name = name.lower()
for k, v in self._headers:
if k.lower() == name:
values.append(v)
continue
if not values:
return failobj
return values
def add_header(self, _name, _value, **_params):
parts = []
for k, v in _params.items():
if v is None:
parts.append(k.replace('_', '-'))
continue
parts.append(_formatparam(k.replace('_', '-'), v))
if _value is not None:
parts.insert(0, _value)
self._headers.append((_name, SEMISPACE.join(parts)))
def replace_header(self, _name, _value):
_name = _name.lower()
for k, v in zip(range(len(self._headers)), self._headers):
if k.lower() == _name:
self._headers[i] = (k, _value)
break
continue
else:
raise KeyError(_name)
def get_content_type(self):
missing = object()
value = self.get('content-type', missing)
if value is missing:
return self.get_default_type()
ctype = paramre.split(value)[0].lower().strip()
if ctype.count('/') != 1:
return 'text/plain'
return ctype
def get_content_maintype(self):
ctype = self.get_content_type()
return ctype.split('/')[0]
def get_content_subtype(self):
ctype = self.get_content_type()
return ctype.split('/')[1]
def get_default_type(self):
return self._default_type
def set_default_type(self, ctype):
self._default_type = ctype
def _get_params_preserve(self, failobj, header):
missing = object()
value = self.get(header, missing)
if value is missing:
return failobj
params = []
for p in _parseparam(';' + value):
try:
(name, val) = p.split('=', 1)
name = name.strip()
val = val.strip()
except ValueError:
name = p.strip()
val = ''
params.append((name, val))
params = utils.decode_params(params)
return params
def get_params(self, failobj = None, header = 'content-type', unquote = True):
missing = object()
params = self._get_params_preserve(missing, header)
if params is missing:
return failobj
def get_param(self, param, failobj = None, header = 'content-type', unquote = True):
if not self.has_key(header):
return failobj
for k, v in self._get_params_preserve(failobj, header):
if k.lower() == param.lower():
if unquote:
return _unquotevalue(v)
else:
return v
unquote
return failobj
def set_param(self, param, value, header = 'Content-Type', requote = True, charset = None, language = ''):
if not isinstance(value, tuple) and charset:
value = (charset, language, value)
if not self.has_key(header) and header.lower() == 'content-type':
ctype = 'text/plain'
else:
ctype = self.get(header)
if not self.get_param(param, header = header):
if not ctype:
ctype = _formatparam(param, value, requote)
else:
ctype = SEMISPACE.join([
ctype,
_formatparam(param, value, requote)])
else:
ctype = ''
for old_param, old_value in self.get_params(header = header, unquote = requote):
append_param = ''
if old_param.lower() == param.lower():
append_param = _formatparam(param, value, requote)
else:
append_param = _formatparam(old_param, old_value, requote)
if not ctype:
ctype = append_param
continue
ctype = SEMISPACE.join([
ctype,
append_param])
if ctype != self.get(header):
del self[header]
self[header] = ctype
def del_param(self, param, header = 'content-type', requote = True):
if not self.has_key(header):
return None
new_ctype = ''
for p, v in self.get_params(header = header, unquote = requote):
if p.lower() != param.lower():
if not new_ctype:
new_ctype = _formatparam(p, v, requote)
else:
new_ctype = SEMISPACE.join([
new_ctype,
_formatparam(p, v, requote)])
new_ctype
if new_ctype != self.get(header):
del self[header]
self[header] = new_ctype
def set_type(self, type, header = 'Content-Type', requote = True):
if not type.count('/') == 1:
raise ValueError
if header.lower() == 'content-type':
del self['mime-version']
self['MIME-Version'] = '1.0'
if not self.has_key(header):
self[header] = type
return None
params = self.get_params(header = header, unquote = requote)
del self[header]
self[header] = type
for p, v in params[1:]:
self.set_param(p, v, header, requote)
def get_filename(self, failobj = None):
missing = object()
filename = self.get_param('filename', missing, 'content-disposition')
if filename is missing:
filename = self.get_param('name', missing, 'content-disposition')
if filename is missing:
return failobj
return utils.collapse_rfc2231_value(filename).strip()
def get_boundary(self, failobj = None):
missing = object()
boundary = self.get_param('boundary', missing)
if boundary is missing:
return failobj
return utils.collapse_rfc2231_value(boundary).rstrip()
def set_boundary(self, boundary):
missing = object()
params = self._get_params_preserve(missing, 'content-type')
if params is missing:
raise errors.HeaderParseError('No Content-Type header found')
newparams = []
foundp = False
for pk, pv in params:
if pk.lower() == 'boundary':
newparams.append(('boundary', '"%s"' % boundary))
foundp = True
continue
newparams.append((pk, pv))
if not foundp:
newparams.append(('boundary', '"%s"' % boundary))
newheaders = []
for h, v in self._headers:
if h.lower() == 'content-type':
parts = []
for k, v in newparams:
if v == '':
parts.append(k)
continue
parts.append('%s=%s' % (k, v))
newheaders.append((h, SEMISPACE.join(parts)))
continue
newheaders.append((h, v))
self._headers = newheaders
def get_content_charset(self, failobj = None):
missing = object()
charset = self.get_param('charset', missing)
if charset is missing:
return failobj
if isinstance(charset, tuple):
if not charset[0]:
pass
pcharset = 'us-ascii'
try:
charset = unicode(charset[2], pcharset).encode('us-ascii')
except (LookupError, UnicodeError):
charset = charset[2]
except:
None<EXCEPTION MATCH>(LookupError, UnicodeError)
None<EXCEPTION MATCH>(LookupError, UnicodeError)
try:
if isinstance(charset, str):
charset = unicode(charset, 'us-ascii')
charset = charset.encode('us-ascii')
except UnicodeError:
return failobj
return charset.lower()
def get_charsets(self, failobj = None):
return [ part.get_content_charset(failobj) for part in self.walk() ]
from email.Iterators import walk